package org.zjvis.datascience.web.controller;

import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.io.output.WriterOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.zjvis.datascience.common.constant.URLConstant;
import org.zjvis.datascience.common.model.ApiResult;
import org.zjvis.datascience.common.model.ApiResultCode;
import org.zjvis.datascience.common.vo.JobStatusVO;
import org.zjvis.datascience.jobserver.service.JobService;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;

/**
 * @description 任务管理接口 Controller
 * @date 2021-09-22
 */
@Api(tags = "任务管理")
@RestController
@RequestMapping("/job")
public class JobSubmitController {

    private final static Logger logger = LoggerFactory.getLogger("JobSubmitController");

    @Autowired
    private JobService jobService;

    @PostMapping(value = "/queryJobStatus")
    @ApiOperation(value = "查询任务运行状态", notes = "根据applicationId查询")
    public ApiResult<String> queryJobStatus(@RequestBody JSONObject params) {
        String applicationId = params.getString("applicationId");
        if (StringUtils.isEmpty(applicationId)) {
            logger.error("API /job/queryJobStatus failed, since {}", "applicationId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        JobStatusVO jobStatusVO;
        try {
            ApplicationReport report = jobService.queryJobStatus(applicationId, null);
            jobStatusVO = new JobStatusVO();
            jobStatusVO.setId(report.getApplicationId().toString());
            jobStatusVO.setState(report.getYarnApplicationState().toString());
            jobStatusVO.setFinalStatus(report.getFinalApplicationStatus().toString());
            jobStatusVO.setProgress(report.getProgress());
            jobStatusVO.setDiagnostics(report.getDiagnostics());
            jobStatusVO.setStartedTime(report.getStartTime());
            jobStatusVO.setFinishedTime(report.getFinishTime());
        } catch (Exception e) {
            logger.error("API /job/queryJobStatus failed, applicationId={}, message={}", applicationId, e.getMessage());
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
        }
        return ApiResult.valueOf(JSONObject.toJSONString(jobStatusVO));
    }

    @PostMapping("/killJob")
    @ApiOperation(value = "停止任务", notes = "根据applicationId停止任务")
    public ApiResult<ApiResultCode> killJob(@Validated(value = JobStatusVO.Id.class) @RequestBody JobStatusVO vo) {
        try {
            jobService.killApplication(vo.getId());
        } catch (Exception e) {
            logger.error("API /job/killJob failed, applicationId={}, message={}", vo.getId(), e.getMessage());
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
        }
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping("/submitJob")
    @ApiOperation(value = "提交任务", notes = "传入算子的命令行参数")
    public ApiResult<String> submitJob(@RequestBody JSONObject params) {
        String appArgs = params.getString("appArgs");
        logger.info("API /job/submitJob entered, appArgs={}", appArgs);
        if (StringUtils.isEmpty(appArgs)) {
            logger.error("API /job/submitJob failed, since {}", "appArgs is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        String applicationId;
        try {
            applicationId = jobService.startApplication(appArgs);
        } catch (IOException e) {
            logger.error("API /job/submitJob failed, appArgs={}, message={}", appArgs, e.getMessage());
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
        }
        return ApiResult.valueOf(applicationId);
    }

    @PostMapping("/submitMLJob")
    @ApiOperation(value = "提交ML任务", notes = "传入命令行参数执行ML模型任务")
    public ApiResult<String> submitMLJob(@RequestBody JSONObject params) {
        String appArgs = params.getString("appArgs");
        logger.info("API /job/submitMLJob entered, appArgs={}", appArgs);
        if (StringUtils.isEmpty(appArgs)) {
            logger.error("API /job/submitMLJob failed, since {}", "appArgs is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        String applicationId;
        try {
            applicationId = jobService.startMLApplication(appArgs);
        } catch (IOException e) {
            logger.error("API /job/submitMLJob failed, appArgs={}, message={}", appArgs, e.getMessage());
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
        }
        return ApiResult.valueOf(applicationId);
    }

    @PostMapping("/submitPySparkJob")
    @ApiOperation(value = "提交pyspark任务", notes = "提交pyspark任务")
    public ApiResult<String> submitPySparkJob(@RequestBody JSONObject params) {
        if (!params.containsKey("appArgs") || StringUtils.isEmpty(params.getString("appArgs"))) {
            logger.error("API /job/writePyScriptToHDFS failed, since {}", "appArgs is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        if (!params.containsKey("appResourcePath") ||
                StringUtils.isEmpty(params.getString("appResourcePath"))) {
            logger.error("API /job/writePyScriptToHDFS failed, since {}", "appResourcePath is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        String appArgs = params.getString("appArgs");
        logger.info("enter submitPySparkJob, appArgs={}", appArgs);
        String appResourcePath = params.getString("appResourcePath");
        String applicationId;
        try {
            applicationId = jobService.startPySparkApplication(appArgs, appResourcePath);
        } catch (IOException e) {
            logger.error("API /job/submitPySparkJob failed, appArgs={}, message={}", appArgs, e.getMessage());
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
        }
        return ApiResult.valueOf(applicationId);
    }

    @PostMapping("/writePyScriptToHDFS")
    @ApiOperation(value = "写文件到hdfs存储系统", notes = "写文件到hdfs存储系统")
    public ApiResult<Void> writePyScriptToHDFS(@RequestBody JSONObject params) {
        if (!params.containsKey("appResourcePath") ||
                StringUtils.isEmpty(params.getString("appResourcePath"))) {
            logger.error("API /job/writePyScriptToHDFS failed, since {}", "appResourcePath is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        if (!params.containsKey("scriptBody") ||
                StringUtils.isEmpty(params.getString("scriptBody"))) {
            logger.error("API /job/writePyScriptToHDFS failed, since {}", "scriptBody is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        logger.info("begin writePyScriptToHDFS.");
        boolean flag = jobService.writePyScriptToHDFS(params);
        if (!flag) {
            logger.error("API /job/writePyScriptToHDFS failed");
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
        }
        logger.info("API /job/writePyScriptToHDFS success");
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping("/runJob")
    @ApiOperation(value = "提交任务至jobServer运行", notes = "传入算子的命令行参数")
    public ApiResult<String> runJob(@RequestBody JSONObject params) {
        String appArgs = params.getString("appArgs");
        logger.info("enter runJob, appArgs={}", appArgs);
        if (StringUtils.isEmpty(appArgs)) {
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        String result = jobService.submitToJobServer(appArgs);
        return ApiResult.valueOf(result);

    }

    @DeleteMapping("/deleteContext")
    @ApiOperation(value = "删除jobServer上的context", notes = "传入context名称")
    public ApiResult<String> deleteContext(@RequestBody JSONObject params) {
        String contextName = params.getString(URLConstant.CONTEXT_NAME);
        String result = jobService.deleteContext(contextName);
        return ApiResult.valueOf(result);
    }

    @PostMapping("/createContext")
    @ApiOperation(value = "删除jobServer上的context", notes = "传入context名称")
    public ApiResult<String> createContext(@RequestBody JSONObject params) {
        String contextName = "datascience-context0";
        String result = jobService.createContext(contextName);
        return ApiResult.valueOf(result);
    }

    @PostMapping("/getLog")
    @ApiOperation(value = "查询全量日志", notes = "根据applicationId查询")
    public void getLog(HttpServletResponse response, @RequestBody JSONObject params) {
        String applicationId = params.getString("applicationId");
        Integer page = params.getInteger("page");
        response.setCharacterEncoding("utf-8");
        PrintWriter writer = null;
        OutputStream os;
        PrintStream ps = null;
        try {
            writer = response.getWriter();
            if (StringUtils.isEmpty(applicationId)) {
                writer.print("param error, applicationId cant't be null");
                writer.close();
                return;
            }
            os = new WriterOutputStream(writer);
            ps = new PrintStream(os);
            jobService.printLog(applicationId, ps, page);
        } catch (Exception e) {
            logger.error("API /job/getLog failed, since {}", e.getMessage());
            writer.print("system error");
        } finally {
            if (ps != null) {
                ps.close();
            }
        }
    }

    @PostMapping("/queryApps")
    @ApiOperation(value = "查询集群上的application", notes = "默认查询未结束的")
    public ApiResult<JSONObject> getLog(@RequestBody JSONObject params) {
        return ApiResult.valueOf(jobService.queryApps(params));
    }

}
